本文为您介绍CSV格式的使用方法和类型映射。
背景信息
CSV格式允许基于CSV结构读写CSV数据。当前,CSV结构是基于表结构推导而来的。支持CSV格式的连接器包括:消息队列Kafka、Upsert Kafka、消息队列RocketMQ、StarRocks和对象存储OSS。
使用示例
利用Kafka以及CSV格式构建表的示例如下。
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true',
'csv.allow-comments' = 'true'
);
配置选项
选项 | 是否必选 | 默认值 | 类型 | 说明 |
format | 是 | 无 | String | 声明使用的格式。使用CSV格式时,参数取值为csv。 |
csv.field-delimiter | 否 | , | String | 指定字段分隔符,仅可使用单字符,默认为逗号(,)。 参数取值类型如下:
|
csv.disable-quote-character | 否 | false | Boolean | 是否允许引用值是否引号,参数取值如下:
|
csv.quote-character | 否 | " | String | 指定引用值的引号字符,默认为双引号(")。 |
csv.allow-comments | 否 | false | Boolean | 是否允许忽略注释行,参数取值如下:
|
csv.ignore-parse-errors | 否 | false | Boolean | 当解析异常时,是否允许跳过当前字段,参数取值如下:
|
csv.array-element-delimiter | 否 | ; | String | 指定分隔数组和行元素的字符串,默认为分号(;)。 |
csv.escape-character | 否 | 无 | String | 指定转义字符,默认禁用。 |
csv.null-literal | 否 | 无 | String | 指定识别成 null 值的字符串,默认禁用。在输入端会将该字符串转为null值,在输出端会将null值转成该字符串。 |
csv.write-bigdecimal-in-scientific-notation | 否 | true | Boolean | 是否将Bigdecimal类型的数据表示为科学计数法,参数取值如下:
|
类型映射
在Flink中,CSV的格式数据使用jackson databind API解析CSV字符串。Flink与CSV的数据类型的映射关系如下。
Flink SQL类型 | CSV类型 |
CHAR / VARCHAR / STRING | string |
BOOLEAN | boolean |
BINARY / VARBINARY | string with encoding: base64 |
DECIMAL | number |
TINYINT | number |
SMALLINT | number |
INT | number |
BIGINT | number |
FLOAT | number |
DOUBLE | number |
DATE | string with format: date |
TIME | string with format: time |
TIMESTAMP | string with format: date-time |
INTERVAL | number |
ARRAY | array |
ROW | object |
其他使用说明
对于写入对象存储OSS,目前暂不支持写入CSV格式的文件,具体原因请参见FLINK-30635。